package com.flink.health;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Properties;

import static com.flink.health.MysqlConfig.*;


/**
 * @author Administrator
 */
public class KafkaFlinkMysql {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        //从kafka中读取数据
        Properties properties = new Properties();
        properties.setProperty("group.id", "logs");
        properties.setProperty("bootstrap.servers", "172.16.12.148:9092,172.16.12.149:9092");
        properties.setProperty("auto.offset.reset", "latest");
        properties.setProperty("key-serializer", "org.apache.kafka.common.serialization.Deserializer");
        properties.setProperty("value-serializer", "org.apache.kafka.common.serialization.Deserializer");
        FlinkKafkaConsumer<String> stringFlinkKafkaConsumer = new FlinkKafkaConsumer<String>("logs", new SimpleStringSchema(), properties);
        DataStreamSource<String> streamSource = environment.addSource(stringFlinkKafkaConsumer);
        SingleOutputStreamOperator<String> filter = streamSource.filter((value -> value.split(",").length == 6));
        //统计新闻话题访问量
        SingleOutputStreamOperator<Tuple2<String, Integer>> newScounts = filter.flatMap(new LineSpliter()).keyBy(0).sum(1);
        //写入到mysql数据库
        newScounts.addSink(new MysqlSink());

        environment.execute("KafkaFlinkMysql");


    }

    public static final class MysqlSink extends RichSinkFunction<Tuple2<String, Integer>> {

        private Connection connection;
        private PreparedStatement statement;

        @Override
        public void open(Configuration parameters) throws Exception {
            Class.forName(DRIVER_CLASS);
            connection = DriverManager.getConnection(DB_URL, userName, password);

        }

        @Override
        public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
            try {
                String name = value.f0.replaceAll("[\\[\\]]", "");
                int count = value.f1;
                String sql = String.format("select 1 from news where name = %s", name);
                String update = String.format("update news set count = %d where name = %s", count, value);
                String insert = String.format("insert into news(name,count) values ('%s','%d')", name, count);
                statement = connection.prepareStatement(sql);
                ResultSet resultSet = statement.executeQuery();
                if (resultSet.next()) {
                    //更新
                    statement.executeUpdate(update);
                } else {
                    statement.execute(insert);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        @Override
        public void close() throws Exception {
            if (null != statement) {
                statement.close();
            }
            if (null != connection) {
                connection.close();
            }
        }
    }

    public static final class LineSpliter implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            String[] split = value.split(",");
            out.collect(new Tuple2<>(split[2], 1));
        }
    }
}
